分类
联系方式
  1. 新浪微博
  2. E-mail

DartVM ReceivePort

介绍

Isolate 间通过 ReceivePort 和 SendPort 实现通信。这种 Port 机制,在 DartVM 底层有着更加深入的应用,这种机制也与 Dart 的消息队列机制关联非常密切。

类关系

ReceivePort 相关联的类有点多,并且跨 Dart 和 C/C++,看着有点乱,梳理如下图:

ReceivePort Dart 抽象类

该类位于 sdk/lib/isolate/isolate.dart,是一个抽象类。

类注释

与 SendPort 一起使用,是 isolate 间通信的唯一方式。

ReceivePort 拥有一个 sendPort getter 方法,用于获取 SendPort。通过 SendPort,能够向其关联的 ReceivePort 发送消息。ReceivePort 实现了 Stream 接口,通过对 ReceivePort 进行 listen,能够进行数据监听。

从 Stream 角度,ReceivePort 是一个非广播 stream,只能有一个 listener 监听消息。不过,通过包装 Stream.asBroadcastStream,可以实现广播。

工厂方法1

开启一个长期活跃的端口,用于接收消息。

external factory ReceivePort([String debugName = '']);

其中:

  • debugName:参与用于调试工具展示
  • 如果取消订阅,端口也会关闭

工厂方法2

基于 RawReceivePort 创建 ReceivePort。

external factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort);

工厂方法实现

两个工厂方法都是只有 external 声明,具体实现位于 sdk/lib/_internal/lib/isolate_patch.dart:

@patch
class ReceivePort {
    @patch
    factory ReceivePort([String debugName = '']) =>
        new _ReceivePortImpl(debugName);

    @patch
    factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) {
        return new _ReceivePortImpl.fromRawReceivePort(rawPort);
    }
}

可以看到,通过工厂方法,指向了具体的实现类 _ReceivePortImpl。

listen 监听

ReceivePort 实现了 stream 接口,因此提供 listen 方法用于进行监听。

StreamSubscription<dynamic> listen(
    viod onData(var message)?,
    {
        Function? onError,
        void onDone()?
        bool? cancelOnError
    });

其中:

  • onError 和 cancelOnError 是不起作用的,因为 ReceivePort 永远不会收到 error
  • onDone 回调会在 close 调用后调用

close 关闭

关闭 receive port。

关闭之后,就收不到后续消息了。

void close();

sendPort getter

获取 sendPort:

SendPort get sendPort;

拿到 sendPort,才能向 receive port 发送消息。

_ReceivePortImpl

该类位于 sdk/lib/_internal/vm/lib/isolate_patch.dart,具体实现如下:

class _ReceivePortImpl extends Stream implements ReceivePort {
  _ReceivePortImpl([String debugName = ''])
      : this.fromRawReceivePort(new RawReceivePort(null, debugName));

  _ReceivePortImpl.fromRawReceivePort(this._rawPort)
      : _controller = new StreamController(sync: true) {
    _controller.onCancel = close;
    _rawPort.handler = _controller.add;
  }

  SendPort get sendPort {
    return _rawPort.sendPort;
  }

  StreamSubscription listen(void onData(var message)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    return _controller.stream.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }

  close() {
    _rawPort.close();
    _controller.close();
  }

  final RawReceivePort _rawPort;
  final StreamController _controller;
}

两个构造方法

可以看到都是指向了 fromRawReceivePort。

对于默认构造函数,创建了一个新的 RawReceivePort 实例,并存入 _rawPort 成员。

构造函数中还有一点,是创建了 Stream 的控制器。并且把 streamController 的 add 方法引用传给 _rawPort 的 handler。

这样 RawReceivePort 只要收到信息,就会自动抛到 Stream 上面去。

从中还能看出,在 Dart 侧创建一个 ReceivePort,其实内部是需要走到底层的。

RawReceivePort

这部分内容单独放到 DartVM RawReceivePort 进行总结。